Spark Python API

基础类型

数据类型

DataType是具体数据类型的基类。

类型Python类型说明
BooleanTypebool
ByteType
IntegerTypeintShortType/LongType
DoubleType/FloatTypefloat
DecimalType()precision,scale参数控制精度;
StringTypestr
DateTypedt.date
TimestampTypedt.datetime
NullTypeNone
BinaryType
ArrayType(EType)List[Type]nullable=True
MapType(KType,VType)dict必须声明确定的keyvalue类型(命名元组)。
nullable=True
StructType([fields])dictStructType包含固定字段;而MapType可以有任意数量的key-value。
StructField(name,DType)nullable=True

Data Types - Spark 3.2.0 Documentation (apache.org)

数据结构

底层API

from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("AppName").setMaster(master).set(...)
sc = SparkContext(conf=conf)

master表示集群的URL,或者local[k]

交互式环境(PySparkShell)中有一个默认的SparkContext对象sc

输入

文件

lines = sc.textFile("examples/src/main/resources/people.txt")

默认文件传输协议为file://使用Spark-on-Yarn时,默认的文件传输协议为hdfs://,即文件应该存储在HDFS集群上。如果要在Client模式下使用主程序所在节点的文件,显式指定协议为file://

省略文件传输协议,且路径非/开头,则表示使用相对路径。

local:/不是有效的文件传输协议(仅适用于Spark集群自身分发依赖库)。

变换

map()

输出

first()

Spark SQL

Spark会话

from pyspark.sql import SparkSession
spark=SparkSession\
        .builder\
        .master("yarn")
        .appName("PythonApp")\
        .config("hive.metastore.uris", "thrift://hadoop-master:9083")\
        .enableHiveSupport()\
        .getOrCreate()
# 如果能定位SPARK_HOME下的配置文件,则可获得相应配置,否则需要通过代码指定
spark.sparkContext.setLogLevel('WARN') # set log level to WARN after then

数据类型

输入数据以pyspark.sql.DataFrame表示。DataFrame相当于是基于Row组织的RDD,可与RDD相互转换。

Spark DataFrame

spark.createDataFrame(data[,schema][,samplingRatio],verifySchema=True)

基于Row序列(本地或分布式RDD类型)创建,

data = [Row(a=1, b=2., c='string1', d=date(2000, 1, 1)),
        Row(a=2, b=3., c='string2', d=date(2000, 2, 1)),
        Row(a=4, b=5., c='string3', d=date(2000, 3, 1))]
# data = spark.sparkContext.parallelize(data) -> RDD
df = spark.createDataFrame(data)  # 自动推测数据类型

基于Python序列(本地或分布式RDD类型)创建,由于序列没有列名信息,需要指定schema

data = [(1, 2., 'string1', date(2000, 1, 1)),
        (2, 3., 'string2', date(2000, 2, 1)),
        (4, 5., 'string3', date(2000, 3, 1))]
# data = spark.sparkContext.parallelize(data) -> RDD
df = spark.createDataFrame(data, schema='a long, b double, c string, d date')
Spark Row类型

Row表示Spark DataFrame中的一行数据,可以使用字典、元组的方式访问其元素,也可以将元素名称作为成员名来访问该元素(类似于命名空间成员的访问方式)。

from pyspark.sql import Row
row = Row(name="Alice", age=11)
'name' in row
row.age, row['name'], row[1]
Person = Row("name", "age")
Person("Alice", 11)  # Row(name='Alice', age=11)
数据格式声明

schema可以以字符串形式简单描述(类型可省略,根据数值采样自动推测)或使用StructType完整描述。

schema = StructType([StructField("a", StringType(), True),
                     StructField("b", DoubleType(), False),
                     StructField("c", StringType(), True),
                     StructField("d", DateType(),   False)])

还可以基于pandas.DataFrame创建。由于pandas.DataFrame已知数据类型,无需指定Schema。

pd.DataFrame(pandas_df)

读取文件

reader:DataFrameReader = spark.read:获取读取接口,可通过以下方式对该接口进行配置:reader.format(<FORMAT>) ->reader.<FORMAT>:支持的文件格式包括Text、CSV、Parquet、OCR、JSON等。

df = spark.read.json(FILE_PATH)
df = spark.read.format("JSON").load(FILE_PATH)

reader.option(key, value)/options(**options):设置输入选项;

csv读取参数
  • 数据格式

    • schemaStructType或文本col0 INT, col1 DOUBLE
    • header=False
    • inferSchema=False:推断数据格式,需要读取两次数据;
    • samplingRatio=1.0:推断数据格式所需读取的记录比例;
    • enforceSchema=True:默认强制应用指定的或推断的数据格式被数据源所有文件,CSV文件中的头部被忽略;反之,仅验证CSV文件的头部字段(推荐禁用该选项以避免意外错误)。
    • maxColumns=20480:限制读取的列数;
    • multiLine=False:数据记录跨行;

    csv不支持仅读取部分数据,可在读取后执行limit()方法返回部分数据

  • 文本编码:encoding='UTF-8'

  • 特殊字符:

    • sep=','
    • quote='"':用于包含分隔符的字段;
    • escape='\':用于转义已经使用引号的字段中的引号,即\"
    • charToEscapeQuoteEscape='\';用于转义引号字段中的转义字符,即\\
    • comment=None(数据中的注释行开头字符);
    • ignoreLeadingWhiteSpace/ignoreTrailingWhiteSpace=False:忽略空白;
  • 特殊值:

    • nullValue='':文件中的缺失值形式;
    • emptyValue='""':文件中空字符串的形式;
    • nanValue='NaN':文件中无效数值的表示形式;
    • positiveInf/negativeInf='Inf':文件中无穷大的表示形式;
  • 数值格式:

    • dateFormat='yyyy-MM-dd':文件中日期表示形式;
    • timestampFormat="yyyy-MM-dd'T'HH:mm:ss.SSSXXX":文件中时间表示形式;
    • maxCharsPerColumn=-1:限制每个字段读取字符数量(默认无限制);
  • 错误处理:

    • mode='PERMISSIVE':发现坏记录将其存储在名为columnNameOfCorruptRecord的列(需要用户在数据格式中设置该列,否则丢弃该列),并将其他列设置为null;当记录字段比数据格式的列少,则缺少的列设置为null;反之,丢弃多余的列。DROPMALFORMED忽略整条坏记录。FAILFAST直接抛出异常。
    • columnNameOfCorruptRecord=spark.sql.columnNameOfCorruptRecord

Hive

df = spark.sql("select * from pokes limit 10")
Hive数据源格式

CSV文件:为了保证Spark能正确推测Hive数据的数据类型,Hive数据源的文件存储中不要包含表头(Spark不识别Hive的表格选项skip.header.line.count),否则Spark将表头视为数据,由于表头为字符串类型,导致自动推导数据类型失败。对于具有表头的数据文件,可直接存储在HDFS上,并通过Spark提供的CSV文件读取接口读取数据。

数据表视图

df.createOrReplaceTempView("people") # createTempView(name)
df = spark.sql("SELECT * FROM people")
df.createGlobalTempView("people")    # createOrReplaceGlobalTempView()
df = spark.sql("SELECT * FROM global_temp.people")
df = spark.table('global_temp.people')

DataFrame API

df.cache():持久化数据(MEMORY_AND_DISK);df.persist([storageLevel])设置持久化存储等级;

df.unpersist([blocking]):释放持久化存储资源;

df.coalesce(numPartitions):重新分片;

df.withColumnRenamed(existing, new):重命名列;对于为暂未计算的抽象列Column调用其alias方法修改随后返回的数据的列名。

df.columns:返回列名组成的列表。

df.dtypes->List[(name,type)]

df.schema->StructType

df.isStreaming:该数据集是否是流数据;

df.rdd->RDD(List[Row])

df.toJSON(use_unicode=True)->RDD(List[str])

变换

  1. df.select(col:Column,...):从DataFrame选择列并执行变换,select执行的操作类似于map。可以提供序列类型或可变长参数列表作为参数。Column类用于表示==基于列的变换过程的声明式对象==,包括以下声明方式:

    • 'col_name'|col/column(col_name)|df['col_name']|df.col_name:使用列名读取该列不做其他变换;仅提供列名默认引用当前查询的数据集的列;

      from pyspark.sql.functions import col,column  # col<->column
      

      可使用df.columns[i:j]选择数据的一个分片。

      对于结构数据字段可通过路径对象来返回嵌套字段值:

      df.select("name.firstname","name.lastname").show(truncate=False)
      
    • df.col_name+1:基于列的数值运算、逻辑运算(+,-,*,/...)等;

      参与运算的数值不能是numpy类型的数值,否则会出错:'numpy.int32' object has no attribute '_get_object_id';应该将此类型转换为兼容的Python内置类型。

    • df.col_name.func()df['col_name'].func()col('col_name').func():使用列名调用内置的变换方法

    • sqlfunc(col('col_name'))sqlfunc(df['col_name']):使用SQL函数库或或自定义变换函数。SQL函数可能不接受字符串列名。

      from pyspark.sql.functions import sqlfunc; # 从内置SQL函数库导入变换方法
      pysaprk.sql.functions import udf;          # 通过udf,用户可自定义变换函数
      
    • expr("EXPR"):由于表达式文本在运行时构造,这种方式==可动态生成查询语句==;

      from pyspark.sql.functions import expr
      df.select(expr('a*2+1'))
      
  2. df.selectExpr(*expr):使用表达式代文本替df.select()的列声明(df.select('col_name')的扩展,等效于df.select(expr("EXPR"))),这种方式无法通过列声明对象调用内置方法(如列重命名)。

    df.selectExpr("age * 2", "abs(age)")
    
  3. SQL查询语句:通过构造数据表视图并利用spark.sql(...)方法传入包含SQL变换方法的原生SQL查询语句进行数据变换。

    df.createOrReplaceTempView('data')
    spark.sql('SELECT a, FUNC(a) FROM data').show()
    
  4. df.transform(func)func可包含一系列select变换;

  5. 条件变换:when相当于是一个条件选择函数的简化形式。

    from pysaprk.sql.functions import when
    df.select(when(df.col==1,df.col+1).otherwise(0).alias("result"))
    

通过列声明调用变换方法

Column.cast/astype(TYPE):列数据类型变换,TYPE可以是文本类型描述或DataType的子类对象。

Column.alias/name:修改列名,默认列名为该列的变换表达式;

Column.asc/asc_nulls_first/asc_nulls_last/desc/desc_nulls_first/desc_nulls_last(col):参考对应的SQL函数;

Column.bitwiseAND/bitwiseOR/bitwiseXOR

Column.between(lower,upper):判断列值是否在上下界之间;

Column.startswith/endswith(other)

Column.contains(other)

Column.isin(*cols):当前列的值是否在其他列中;

Column.dropFields(*fields)/getField(name)/withField(name,value):提取/丢弃/修改StructType字段;

Column.getItem(idx_or_key):从序列或字典中提取元素或字段;

Column.eqNullSafe(other)NaN = NaN returns true.

Column.isNull/isNotNull()

Column.like/rlike(other):模糊匹配/正则匹配;

Column.substr(startPos,length):获取子串(functions.substring);

SQL变换函数

Sspark定义了大量内置的变换函数以及自定变换函数的接口。

import pyspark.sql.functions as sqlfunc
自定义变换函数
from pyspark.sql.functions import udf 
@udf(returnType=ArrayType(StringType()))
def str_2_array(x: str):
    if x is None: return []
    elif x.startswith('[') and x.endswith(']'): return eval(x)
    else: return [x]
df.select(str_2_array(df.a)).show()
Pandas变换函数
from pyspark.sql.functions import pandas_udf
@pandas_udf(returnType='long', functionType=None)
def pandas_plus_one(series: pd.Series) -> pd.Series:
    return series+1
df.select(pandas_plus_one(df.a)).show()
def filter_func(iterator):
    for pdf in iterator:  # iterator over pandas DataFrames
        yield pdf[pdf.id == 1] # return iterator of pandas DataFrames
df.mapInPandas(filter_func, df.schema).show()  # [3.0]
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())
df.applyInPandas(plus_mean, schema=df.schema).show()

SparkML变换方法

SparkML提供了基于数值特征的变换方法fit/transoform(),==支持拟合变换过程中的参数;相比之下,基于select()和SQL变换函数,需要自己实现参数拟合的方法==。此外,SparkML提供的变换类,支持同时处理多个列;SQL变换函数的输入为一列,需要自己实现多列处理逻辑并记录对应参数。

SparkML的变换方法的输入要求==将每一行数据参与变换的特征列拼接为向量Vector==(SQL函数库中的array函数实现的拼接与变换方法的输入类型不一致),可以使用VectorAssembler实现此变换。

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["x", "y", "z"], outputCol="features")
x = assembler.transform(x)
some_transformer = SomeTransformer(*,inputCol='features',outputCol='_features')
model = some_transformer.fit(x)  # -> SomeTransformerModel
x = model.transform(x).drop('features').withColumnRenamed('_features', 'features')

变换后的数据包含原有数据列,以及变换后新增的features列(DenseVector)。为了方便后续分别处理各特征列的数据,需要将features[拆分为多列](# 将字典或序列转换为多列)。

@udf(returnType=ArrayType(DoubleType()))  # from pyspark.sql.functions import udf
def vector_2_array(x: DenseVector):
    return [ float(i) for i in x ]
y:SparkColumn = vector_2_array(x['features']).alias('features')
y = {col: y.getItem(i).alias(col) for i, col in enumerate(_columns)}
x = x.select(*x.columns, *y.columns)  # 此处需要处理变换后的重名列(drop or rename)

Spark 3.0 pyspark.ml模块自带vector_to_array()方法。

列变换

内置变换方法来自pyspark.sql.functions(SQL函数)和pyspark.ml.feature(变换类)等模块。

broadcast(df)

bucket(numBuckets, col)

coalesce(*cols):返回第一个不为null的列;

生成列

input_file_name():获取当前任务的文件名;

current_date/current_timestamp():返回当前日期,同一个查询中的调用返回相同值。

lit(VALUE):创建常量列;

monotonically_increasing_id()

rand(seed=None)/randn([seed]):生成[0,1)区间均匀分布/标准正态分布随机数;

sequence(col_start,col_stop[,col_step]):生成等差数列数组,输入参数为列名或常量列(lit);

spark_partition_id()

无效值处理

isnan/isnull(col):判断值是否为NaN/null

nanvl(col1, col2):如果col1!=NaN,返回col1;否则返回col2

df.fillna(value[,subset])

数值计算函数

abs/exp/expm1/sqrt/cbrt/log/log10/log1p/log2(col)/pow(x,y)cbrt三次方根,expm1->$e^x-1$;

greatest/least(*cols):返回多个列元素中的最大/小值;

factorial(col):阶乘;

ceil/floor/rint(col):近似;

round/bround(col[,scale])HALF_UP/HALF_EVEN rounding mode,整数scale控制近似精度,负数表示整数部分精度。

degrees/radians(col):弧度和角度转换;

cos/sin/tan/acos/asin/atan(col)/atan2(y,x):三角/反三角函数;

sinh/cosh/tanh/acosh/asinh/atanh:双曲/反双曲函数;

bitwise_not(col)

shiftleft/shiftright/shiftrightunsigned(col,numBits)

exists(col,f):返回指定判断函数的真值;

hypot(x,y)sqrt(a^2+b^2)

数值特征处理

MinMaxScaler:$(0,1)$规范化;

字符串计算函数

length(col):字符串或字节序列的长度;

levenshtein(left, right):两个字符串的Levenshtein距离;

sentences(string,language=None,country=None):将字符串拆分为语句数组,语句拆分为单词数组。

字符串变换

initcap/lower/upper(col)

translate(srcCol,matching,replace):对srcCol出现在matching中的字符替换为replace相应位置上的字符,例如matching='123',replace='ZYX',则替换规则为1->Z,2->Y,3->X

ascii(col):计算字符串的首字符ASCII码数值;

lpad/rpad(col,len,pad)

trim/ltrim/rtrim(col)

repeat(col, n):复制字符串n次构成新值;

reverse(col):反转字符串(或数组);

split(str,pattern,limit[3.0]):按指定模式(Java正则表达式)拆分字符串;

查找替换

instr(col,substr)/locate(substr,col,pos=0):查找子串;

substring(col,pos,len):获取子串;substring_index(str,delim,count)查找分隔符出现count次之前的子串(如果count为负数则反向查找)。

overlay(src,replace,pos,len=-1):从src的指定位置pos(位置从1开始),用replace的内容替换src内容,最大替换长度为len

regexp_extract(str, pattern, idx):抽取idx指定的捕获组,如果模式未匹配或指定捕获组未匹配,返回空字符串。

regexp_replace(str, pattern, replacement)

编码

base64/unbase64(col):BASE64编码/解码;

crc32(col):计算二进制序列的CRC32校验值返回bigint

hash/xxhash64(*col):计算输入列元素的HASH整数值/长整数值;

md5/sha1(col)/sha2(col,numBits):返回输入列的MD5/SHA-1/SHA-2xx十六进制编码字符串;

bin(col):二进制数据的字符串表示;

hex/unhex(col):字符串/整数的十六进制字符串表示(字符串每个字符的ASCII码值映射为十六进制);

conv(col,fromBase,toBase):字符串表示的数值进行进制转换;

decode/encode(col,charset)使用指定编码方法将字节序列解码为字符串(将字符串编码为字节序列);

concat(*cols)/concat_ws(sep,*cols):将多个列字符串/字节序列拼接为新数据;

format_string(format, *cols)printf模式输出新列;

日期计算函数

year/quarter/month/hour/minute/second(col)

add_months(start,months)/date_add/date_sub(start,days)

next_day(date, dayOfWeek):返回下一个是指定DayOfWeek的日期(“Mon”, “Tue”, “Wed”, “Thu”, “Fri”, “Sat”, “Sun”)

trunc(col_date,format)/date_trunc(format,col_timestamp):按给定格式舍弃末尾的时间值;timestamp_seconds(col)[3.1]截取时间字段到秒;

datediff(end,start):返回两个日期间相差的天数;months_between(date1,date2[,roundOff])计算两个日期间相差的月数(如果为月中同一天或最后一天返回整数,否则返回浮点数);

dayofmonth/dayofweek/dayofyear/weekofyear/last_day(col)last_day表示日期所在月的最后一天;

date_format/to_date(col,format):将date/timestamp/string类型的日期按给定格式(形如yyyy-mm-dd)转换为字符串。to_date->col.cast("date")

from_unixtime/to_timestamp/unix_timestamp(col,format=None):将时间戳(秒)转换为时间文本(to_timestamp->col.cast("timestamp")unix_timestamp输入列如果为指定则返回当前时间);

from_utc_timestamp/to_utc_timestamp(timestamp, tz):可指定时区。

排序

asc/asc_nulls_first/asc_nulls_last/desc/desc_nulls_first/desc_nulls_last(col):返回排序表达式。

df.orderBy(*cols)/df.sort(*cols,ascending=True):使用上述UDF声明排序表达式。

df.sortWithinPartitions(*cols,ascending=True)

集合类型处理函数
数组计算

array(*cols)/array_repeat(col,count):将一个或多个列拼接成数组、将一列重复构成数组;

array_join(col,delimiter[,null_replacement]):将数组拼接为字符串;

zip_with(left,right,f(l,r)):合并两个序列,合并值基于两个序列相应元素通过f计算;

flatten(col):拼接嵌套数组(指拼接第一层嵌套);

array_contains(col, value)

forall(col,f):判断数组元素是否都满足判断条件f

transform(col,f)[3.1]对数组每个元素进行f变换,返回变换后的数组;

element_at(col,idx):从数值提取指定索引的元素;

slice(x,start,length):返回子数组;

array_max/array_min(col)/array_position(col,value):查找元素;

array_sort(col)/sort_array(col,asc=True):(升序)排序;

array_remove(col,element)/array_distinct(col):移除指定元素;去除重复元素;

array_except/array_intersect/array_union(col1,col2):集合操作(差集/交集/并集);

arrays_overlap(a1,a2):判断两个数组是否存在公共元素(如果无公共元素且都包含null返回null)。

shuffle(col):随机置换序列元素。

字典计算

create_map(*cols):使用输入列构建字典,输入列分别轮流作为字典的keyvalue值;

map_concat(*cols):将多个输入的字典拼接为一个字典;

map_from_arrays(col1, col2):将两列数据分别转换为字典的keyvalue,如果输入数据为数组,则返回的字典包含多个字段;

map_zip_with(col1,col2,f(key,v1,v2))[3.1]合并两列字典,使用指定函数合并相同key对应的value

map_entries[3.0]/map_keys/map_values(col)<->map_from_entries(col):将字典的keyvalue转换为一条记录Row(key=,value=),并将所有记录拼接为数组返回(后续可通过explode将数组中的记录展开为数据表中的记录)。

transform_keys/transform_values(col,f):对字段的key/value执行f变换、返回变换后的字典;

map_filter(col, f(k,v))[3.1]过滤字典中不满足条件的字段;

结构体计算

struct(*cols):基于列名和列类型生成StructType

结构化数据处理
CSV文本处理[3.0]

from_csv(col,schema[,options]):将CSV字符串转换为一行记录;

to_csv(col[, options]):将StructType转换为CSV字符串;

schema_of_csv(col,options=None):从CSV字符串推测Schema;

JSON文本处理

from_json(col,schema,options=None):将JSON文本转换为Row/StructType对象(可再从结构体扩展为多列);使用schema_of_json(json:str,options[3.0])从==JSON文本==推测Schema并传递给from_json

get_json_object(col,path):使用JSON Path从==JSON文本==提取字段,例如"$.field"

json_tuple(col,*fields):从==JSON文本==的根节点提取一个或多个字段fields;==由于get_json_objectjson_tuple没有指定Schema,返回数据转换为字符串(默认类型)==。

to_json(col,options=None)StructTypeArrayTypeMapType转换为JSON文本;

按元素变换

df.replace(to_replace[, value, subset])

过滤

行过滤

filter(col,f):行过滤;

df.filter/where(col_expr):数据集过滤;df.toDF(*cols)基于列名过滤;

df.limit(num)->DataFrame:保留num行数据;

df.sample([withReplacement, …])

df.dropna(how='any',thresh=None,subset=None)how=any|allthresh指定null的比例,subset指定检查的列;

df.distinct()/df.drop_duplicates([subset]):保留唯一行。

列过滤

df.select(*cols)df.select(df.columns[i:j]):按列名或列编号选择列;

df.drop(*cols)df.drop(df.columns[i:j]):按列名或列编号丢弃列;

df.colRegex(colNamePattern)->Column:选择列名与模式匹配的列;

插入和扩展

插入新列

df.withColumn(colName, col):添加或替换一列数据;添加的数据列仅能为常量或基于当前DataFrame声明的列变换(从而保存分布式数据结构计算的兼容性)。

将集合类型扩展为多列
将字典或序列转换为多列

使用列的getItem()函数可按位置或键名获取集合类型的元素,从而实现一列到多列的变换。由于字典或序列长度可能不统一,导致无法合并并行处理结果,必须预先给定固定数量的列或列名。

col_names = ['a', 'b', 'c']
cols = [df['map_col'].getItem(col).alias(col) for col in col_names]
cols = [df['list_col'].getItem(i).alias(str(i)) for i,_ in enumerate(col_names)]
df_new = df.select(*cols)

如果实际已知MapType具有固定数量且相同的字段,则可以:

col_names = list(df.first().asDict()[col_name].keys())
将结构体转换为多列

由于结构体具有固定字段,所以能够并行处理:

df.select("struct_col.*")  
df.select([df['struct_col'].getField(f) for in fields])
将集合类型扩展为多行

explode/explode_outer/posexplode/posexplode_outer():将输入列(序列/字典)扩展为多行记录;其中字典的keyvalue分别映射为两列(后者保留null元素)。pos-函数为返回值增加位置编号字段(表示其在源数据中的顺序,默认为pos)。

数据集运算

df.join(other[, on, how])

df.crossJoin(other)

df.exceptAll(other):数据集列差集,返回在当前数据集但不在另一个数据集的列;

df.intersect/intersectAll(other):返回两个数据集都存在的行(intersectAll保留重复);

df.subtract(other):返回在此数据集但不在另一个数据集的行。

df.union/unionAll(other):==纵向拼接==(不去重,使用df.distinct()去重);

df.unionByName(other, allowMissingColumns=False):根据列名对应拼接;

聚合

聚合内置函数:aggregate(col, initialValue, merge[, finish])

df.agg(count_distinct(df.age, df.name).alias('c'))

分组聚合:df.groupBy(col:Column,...).agg_func()

agg(*expr)
apply(udf)
applyInPandas(func, schema)
# avg/count/max/min/mean/sum... => 存在等效的UDF

时间窗口聚合:

w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum"))
col.over(window)
统计量

avg/count/max/min/mean/sum(col)

stddev/stddev_samp/stddev_pop(col):样本/总体标准差(stddev->stddev_samp);

variance/var_samp/var_pop(col):样本/总体方差=>df.cov(col1,col2)

corr/covar_pop/covar_samp(col1,col2):计算两列的相关系数/总体协方差/样本协方差;=>df.corr(col1,col2[,method])

approx_count_distinct(col[, rsd]):近似统计列的不同值数量。

count_distinct/sum_distinct(*cols):统计不相同元素的数量/求和;

累积数值运算

product(col):累计运算;

合并拼接

collect_list/collect_set(col):将列数据合并为序列或集合;

采样

first/last(col[,ignorenulls]):获取分组的第一个元素;

窗口操作

lag/lead(col[,offset,default]):返回当前行的前/后第offset行的值;

nth_value(col, offset[, ignoreNulls]):返回当前窗口中第offset行(从1开始)的值;

输出

在Driver侧输出DataFrame的数据信息:

df.printSchema()
df.explain()     # Prints the (logical and physical) plans
df.count()
df.show(vertical=False)

在Driver侧收集DataFrame数据:

df.collect()         # -> List[Row] 
df.take(num)         # -> List[Row]
df.head/tail(n=NUM)  # -> List[Row]
df.first()           # -> Row <=> df.head()
df.toPandas()        # -> pandas.DataFrame
df.to_pandas_on_spark([index_col])

注意,上述方法与df.limit()不同,后者输出未Spark DataFrame

df.foreach(f(Row))
df.foreachPartition(f(List[Row]))

df.toLocalIterator(prefetchPartitions=False):将每个分区逐次取到本地进行处理;

for part in df.toLocalIterator(prefetchPartitions=False):
   # do processing in client
   # part is a Local DataFrame

存储方法

writer=df.write->DataFrameWriter

writer.format(<source>):输出格式包括:csv,json,jdbc,parquet,...,等效调用writer.<source>

df.writeStream

CSV存储参数
  • 存储方法
    • 模式mode='append'|'overwrite'|'ignore'|'error'
    • 压缩compression=None|'bzip2'|'gzip'|'lz4'|'snappy'|'deflate':CSV读取不止解压缩方法,因此如果还要将存储数据读出来则选择不压缩输出。
  • 数据格式:
    • header=False:将数据的字段名写入文件的首行;
  • 特殊字符,参考CSV读取参数
    • escapeQuotes=True:默认将包含引号的字段使用引号包围并对其中的引号转义;
    • quoteAll=False:将所有字段使用引号包含;
  • 数值格式,参考CSV读取参数

分区

df.repartition(numPartitions, *cols)
writer.partitionedBy(days("ts"))
分区方法

years/months/days/hours(col):按天分区;

Spark SQL Guide: Getting Started - Spark 3.2.0 Documentation (apache.org)

Spark SQL API Reference — PySpark 3.2.0 documentation (apache.org)

用于从Hive读取数据生成DataFrameDataSet

Spark Pandas API

Pandas DataFrame [3.2]

Spark SQL and DataFrames - Spark 3.2.0 Documentation (apache.org)

Quickstart: Pandas API on Spark — PySpark 3.2.0 documentation (apache.org)

Pandas API on Spark User Guide — PySpark 3.2.0 documentation (apache.org)

Pandas API on Spark Reference — PySpark 3.2.0 documentation (apache.org)

Spark Streaming

可扩展、高吞吐、可容错的实时数据流处理。

Spark Streaming

内部数据流

Spark Streaming

将数据流拆分为小批次(discretized stream or DStream,a sequence of RDDs)。每一次处理程序调用时,接收到的数据在Spark上生成一个RDD对象。

image-20211221180224813

数据源:kafka、TCP socket、……

算法API:map、reduce、join、window;可将机器学习算法和图处理算法应用于数据流;

输出:文件、数据库、仪表板

流处理程序框架

以Kafka作为数据源的流处理程序框架:

sc = SparkContext(conf=conf)   # 创建Spark连接上下文
stream_context = StreamingContext(sc, batchDuration=5)  # 创建Kafka流处理上下文
# 设置Kafka消费者订阅参数
zookeeper = 'node1:2181,node2:2181,node3:2181'
group_id = 'spark-streaming-consumer'
topics = {'noah.unix': 1}
# 创建Kafka数据流: 需要根据数据源的编码格式指定对应的解码方法
kafkaStream = KafkaUtils.createStream(
   stream_context, zookeeper, group_id, topics,valueDecoder=msgpack.loads)
# 流处理过程:KafkaDStream/TransformedDStream变换方法
results = kafkaStream.map(func1).reduce(func2)...
# 输出过程
kafkaStream.foreachRDD(proc_rdd)  # 定义每一批数据的处理函数
stream_context.start()             # 启动流处理任务
stream_context.awaitTermination()  # 等待流处理结束或中断,防止程序提前退出

在上述框架中添加的普通Python程序(例如打印语句)仅会被执行一次,只有流处理相关代码才会在流数据处理每次触发执行时被执行。当流处理启动后,不能再修改处理过程。程序同一时间仅能有一个有效的StreamingContext,一个StreamingContext可以创建多个数据流。

数据源
stream_context.socketTextStream("localhost", 9999) # TCP socket数据源
stream_context.textFileStream(dataDirectory)  # python仅支持文本文件(HDFS)

Kafka连接模式

Reciever模式

上述框架采用Reciever模式。Receiver接收的数据储存在Spark执行器中,Spark流处理任务处理接收的数据。如果任务出错可能导致数据丢失,需要启用Write Ahead Logs将接收数据写到分布式存储以在必要时恢复。

Receiver要占用Spark应用的一个任务线程,因此分配给执行器的cores总数要大于1。

直连模式
kafkaStream = KafkaUtils.createDirectStream(
    kafka_context, list(topics.keys()), 
    {"metadata.broker.list": "node1:9092,node2:9092,node3:9092"},
    valueDecoder=msgpack.loads
)

流处理过程

处理过程以RDD作为处理对象,针对当前批次数据RDD执行mapreduce等分布式变换处理操作,返回TransformedDStream处理结果。

流处理过程是在执行器上分布式执行的,因此在Driver侧无法查看这些处理过程中的输出。

map(func):对当前批次数据的每一条记录执行运算并返回结果;传递给func的数据为RDD中每一条记录(普通Python对象)。

mapPartitions(func):对当前批次数据的每个分片执行map操作,传递给func的数据是一个分片。该变换的效果和map相同,返回结果都是包含每条记录的RDD。使用该方法代替map()的场景为减少变换方法需要反复执行的初始化操作。

因为无法通过数据序列化将特殊对象从Driver传递给执行器(例如数据库连接),因此需要在变换方法中反复调用初始化方法,造成较大开销。利用mapPartitions仅需为每个分片在对应的执行器上执行一次初始化。

def func_partition_map(data:Iterable):
   for data in Iterable:
      yield func_map(data)

变换函数返回一个处理后数据的迭代器可使用yield(减少创建对象最高效)、iter()或直接返回Iterable

flatMap(split_func):一对多映射,将一条记录拆分为序列,并将其转换为多条记录。

transform(trans_func):将一批数据RDD整体进行变换,可定义任意针对RDD的变换方法。可以在trans_func中将RDD转换为DataFrame

filter(func):仅返回func值为true的记录。

统计聚合

count()->TransformedDStream:返回当前批次数据的数量(标量RDD)。

countByValue():统计不同记录的数量(groupby-count),返回包含(record, num)的数据流。

reduce(redfunc):对当前批次数据执行reduce运算,并返回标量RDDredfunc应该满足结合律和交换律从而支持并行计算。

reduceByKey(func, numPartitions=None):(groupby-aggregate)对于数据结构为(key,value)的数据流,按key分组,并对value进行聚合。numPartitions指定分组任务的并行数量(即RDD的分片数量,集群模式默认值为spark.default.parallelism)。

updateStateByKey(func_update_state):数据记录内部状态维护。对于数据(key,value),针对每个key维护一个状态变量(可定义任意数据类型)。当处理一批数据时,会将每个key对应的值构成序列传递给状态更新函数,基于该值序列可计算更新状态。

def func_update_state(values, state):
   if state is None:
      # init_state
   # state_update <- values
   # state <- state + state_update
   return state
时间窗操作
Spark Streaming

窗口操作时间参数:1)windowDuration:时间窗长度;2)slideDuration:滑动时长;均为处理周期的整数倍。

window(...) :获取时间窗口中的所有数据。

countByWindow(...):获取时间窗口中的所有数据的计数。

reduceByKeyAndWindow(func,[invFunc],...,numPartitions=None,filterFunc=None):记录滑动窗口中每个批次数据的聚合结果reduceByKey,通过func将新加入窗口的批量数据的聚合结果合并到窗口聚合结果中,利用invFunc从窗口聚合结果中移除离开活动窗口的批量数据的聚合结果。如果未提供invFunc(某些合并结果可能也不支持反向移除),那么每次要对聚合窗口中的每个批次的聚合结果做合并,因此效率较低。filterFunc基于数据(key,value)进行过滤,仅对满足条件的数据聚合。

countByValueAndWindow()基于countByValue结果;reduceByWindow()基于reduce结果。

内部变换

repartition(numPartitions):将RDD重新分片。

多数据流处理

union(otherStream):和其他数据流合并。

join(otherStream,numPartitions=None):合并数据流,(K,V)+(K,W)->(K,(V,W));还可以指定合并方式:leftOuterJoin,rightOuterJoin,fullOuterJoin

cogroup(otherStream,numPartitions=None):合并数据流,将每个数据流相同键值的数据聚合为一个序列再进行拼接,(K,V1)+(K,V2)...+(K,W1)+(K,W2)+...->(K,(V1,V2,...),(W1,W2,...))

输出过程

针对当前批次数据执行输出操作(标准输出、文件、数据库、流引擎……),无返回数据。==流处理流程需要以输出过程来触发处理过程执行,否则系统会直接丢弃数据而不会执行变换处理操作==。

pprint: (num=10):输出当前批次数据RDD中前num条记录;如果RDD为序列类型则输出为序列,如果为标量,则输出标量。

saveAsTextFiles(prefix, [suffix]):输出文件名称格式prefix-TIME_IN_MS[.suffix](PythonAPI仅支持文本文件)。

foreachRDD(proc_rdd):对当前批次数据进行自定义处理。可在处理逻辑中添加输出方法。proc_rdd是==运行在Driver侧==的方法(不要将连接对象传递给执行器),因此可以将数据输出到终端、保存到Driver节点的文件系统、输出到数据库或HDFS。也可将RDD转换为DataFrame再执行输出处理。

def proc_rdd(data: RDD):
   data = rdd.map(...).reduce(...) # 进一步分布式操作
   print(data)  # 输出到driver终端
   pd.DataFrame.from_records(data).to_parquet(FILEPATH) # 输出到文件

Checkpointing容错机制

How to configure Checkpointing

性能优化

Performance Tuning

Structured Streaming

结构化流处理是基于Spark SQL引擎的可扩展、高容错流处理引擎。可以使用Dataset/DataFrame API来描述数据处理过程。

容错机制:checkpointing and Write-Ahead Logs。

处理模式:

  • micro-batch processing:最少延迟100ms(默认);
  • continuous processing:最少1ms延迟(Spark 2.3+)。

流处理程序框架

stream_df = spark\
    .readStream\
    .format("kafka")\
    .trigger(processingTime=None,...)\ # 流处理周期
    .option("kafka.bootstrap.servers", "node1:9092,node2:9092,node3:9092")\
    .option("subscribe", "tpoics")\  
    .load()
# "topic1,topic2" | "topic.*" 
query = stream_df.select(...)\
                 .writeStream.foreachBatch(batch_func)\
                 .start()
query.awaitTermination()

流处理程序必须以流式DataFrame进程传递,最后调用writeStream进行输出。

变换过程必须返回流式DataFrame,否则产生*Queries with streaming sources must be executed with writeStream.start();*

流处理周期

可设置一项触发选项,指定处理周期。

reader.trigger(processingTime='5 seconds',once=None,continuous='1 minute')

Structured Streaming Programming Guide - Spark 3.2.0 Documentation (apache.org)

Structured Streaming API Reference — PySpark 3.2.0 documentation (apache.org)

输入

可使用与批量处理相同的接口读取文件,此外还可指定流数据源(如Kafka)。

Kafka输入流数据可能是以字节序列存储,需要在接收数据后自行进行反序列化(解码)。

变换处理

由于数据流以结构化的流式DataFrame组织,因此Spark SQL的DataFrame API都适用于流数据的处理。

Kafka数据流除了数据字段value,还包含元数据字段,包括keytopicpartitionoffsettimestamptimestampTypeheaders。实际处理开始前,需要首先从value字段(例如JSON文本)中将数据字段提取出来并结构化为新的DataFrame

输出

writer.outputMode('append|complete|update')

writer.queryName('streaming_query'):指定处理流程的名称(query.name);

外部存储

参考批量处理的输出方式,仅支持append模式。

输出至Kafka

自定义处理

foreach
def proc_row(row):
   # ......
class ForeachWriter:
   def open(self, partition_id, epoch_id): # 创建到目标的writer连接
   def process(self, row): # 处理和写入数据
   def close(self, error): # 关闭writer连接
df_stream = writeStream.foreach(proc_row).start()  # Append,Update,Complete
foreachBatch
def batch_print(df: DataFrame, epoch_id: int):
    print(f"_________________ {epoch_id} ____________________")
    df.persist()    # 防止重复计算
    df.printSchema()
    df.show()
df_stream = writeStream.foreachBatch(batch_func).start() # Append,Update,Complete

不支持连续模式,使用foreach

调试输出

console
writeStream.format('console')\
           .option('numRows', 20)\
           .option('truncate', True)\
           .start()                    # Append,Update,Complete
memory
writeStream.format("memory").queryName("tableName").start()  # Append,Complete

图分析算法

GraphFrame

GraphFrame库用于在Spark上基于DataFrame表示图数据,并封装了图分析算法。

conda create -n graph -c conda-forge pyspark graphframes

在代码中引用GraphFrame库:

from graphframes import *
spark = SparkSession.builder\
        .appName('Spark Graph')\
        .config('spark.jars.packages', 'graphframes:graphframes:0.8.1-spark3.0-s_2.12')\
        .getOrCreate()

或通过命令行(spark-submitpyspark)参数添加引用:

pyspark --packages graphframes:graphframes:0.7.0-spark2.4-s_2.11 #*

*:根据实际安装的Spark版本,从Maven仓库选择对应版本的库。

通过分别表示图的节点和边的DataFrame构造图对象,通过edgesvertices访问图的边和节点。

g = GraphFrame(v, e)
g.vertices
 .filter("population > 100000 and population < 300000")
 .sort("population")

图分析算法

from_expr = "id='Den Haag'"
to_expr = "population > 100000 and population < 300000 and id <> 'Den Haag'"
result = g.bfs(from_expr, to_expr)

GraphX

MLlib

MLlib: Main Guide - Spark 3.2.0 Documentation (apache.org)

MLlib (DataFrame-based) API Reference — PySpark 3.2.0 documentation (apache.org)

Breeze, which depends on netlib-java

native BLAS such as Intel MKL, OpenBLAS, can use multiple threads in a single operation, which can conflict with Spark’s execution model.